-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FIX] Issue #709: do not enqueue all responses #713
base: main
Are you sure you want to change the base?
Conversation
I'm not able to find requests which increase the queue size
|
Try |
tests/integration/it_utils.py
Outdated
# concurrent wallet_fund operations will attempt to advance the ledger at the same | ||
# time. Consequently, all the funding operations fail, except for the first one. | ||
# using a lock will serialize the access to this critical operation | ||
async with async_wallet_fund_lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mvadari I found that adding a lock to this section of the code eliminated the "Account Not Found" error.
|
tests/integration/reusable_values.py
Outdated
witness_wallet = Wallet.create() | ||
await fund_wallet_async(witness_wallet) | ||
await asyncio.gather( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stick to one change per PR - this is unrelated to this PR.
How did you test this change? If not by adding a test case, then what? |
I don't have a good idea to test this change. But irrespective of the impact on this issue, I think this change is useful. I don't know which inputs would reproduce the issue description. |
…- Many thanks to https://github.com/Jbekker for pointing out the issue and the testcase for the same
@@ -0,0 +1,91 @@ | |||
"""Memory usage of Websocket clients.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be called test_websocket_client
and the description should be more general, as it may be used for other tests in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 3a53ffc
class TestAsyncWebsocketClient(IsolatedAsyncioTestCase): | ||
"""Memory usage of async-websocket client""" | ||
|
||
async def test_msg_queue_async_websocket_client( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test can still use the test_async_and_sync
framework - there's a websockets_only
parameter in the decoration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current version of the code uses test_async_and_sync
construct, thanks for your suggestion.
but I don't like this version. Due to the use of await client.request(...)
, the requests are fulfilled in a serialized fashion. The queue size will never exceed one in this test.
Ideally, here is what I'd like to do: The asyncio.create_task(...)
concurrently creates many requests. Now, we can verify that the responses are not logged into the queue.
class TestAsyncWebsocketClient(IntegrationTestCase):
"""Memory usage of async-websocket client"""
@test_async_and_sync(globals(), websockets_only=True)
async def test_msg_queue_async_websocket_client(self, client):
"""Test the rate of growth of the Message queue in async_websocket_client under
persistent load. Admittedly, this is not a precise measure, rather its a proxy
to measure the memory footprint of the client
"""
for _ in range(5):
asyncio.create_task(
client.request(
BookOffers(
ledger_index="current",
taker_gets=XRP(),
taker_pays=IssuedCurrency(
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
),
limit=500,
)
)
)
asyncio.create_task(
client.request(
BookOffers(
ledger_index="current",
taker_gets=IssuedCurrency(
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"
),
taker_pays=XRP(),
limit=500,
)
)
)
# messages corresponding to the above two requests might reside in the
# queue. If these requests have already been fulfilled, the queue size will be reduced by an appropriate amount
self.assertEqual(client._messages.qsize() <= 2, True)
# wait for some time, we want to ensure that all the requests are registered in
# the websocket client. asyncio library asynchronously populates the queue
await asyncio.sleep(2)
# the messages queue has not increased in proportion to the requests/responses
self.assertEqual(client._messages.qsize() <= 2, True)
I'm trying to accomodate asyncio.create_task
inside test_async_and_sync
decorator.
what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using subscriptions? Can those stay in the queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not able to test the Subscribe
method correctly. Contrary to my understanding, this code print only one message and gets stuck. It does not indefinitely print the stream of messages from the other peer.
What am I missing? The structure of the code is similar to the tests here
class TestSubscriptionWebsocketClient(IntegrationTestCase):
@test_async_and_sync(globals(), websockets_only=True)
async def test_msg_queue_size(self, client):
req = Subscribe(streams=[StreamParameter.LEDGER])
# NOTE: this code will run forever without a timeout, until the process is killed
await client.send(req)
# Note: AsyncWebSocket clients need the "async for" construct for execution.
# Alternatively, we will need to implement __aiter__ method in the respective class.
async for message in client:
print(message)
print("Queue size: ")
print(client._messages.qsize())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're connecting to a standalone node, you also need to close ledgers independently. A standalone node won't do that automatically for you. That's why there's the additional await accept_ledger_async()
in the for loop in the test.
Alternatively, you can add use_testnet=True
in the test_async_and_sync
decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might also be worth considering rewriting the use_testnet=True
parameter into something that'll also allow you to connect to mainnet (i.e. not a boolean, so you can connect to other things as well).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it, thanks.
Subscribe
does use the above message queue. Every response from the subscription stream adds one message, but that message is removed almost immediately. I don't know which method is popping out the messages, but the message queue is empty.
are you proposing that we use the Subscribe
request for the message queue tests? but, it is not representative of a request-response back-and-forth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, fair. I was just thinking about commands that would legitimately grow the queue.
|
||
# enqueue the response for the message queue | ||
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict) | ||
# a response that fulfills a future is not enqueued again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is more confusing than useful IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 3a53ffc
class TestWebsocketClient(IntegrationTestCase): | ||
"""Memory usage of websocket client""" | ||
|
||
@test_async_and_sync(globals(), websockets_only=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is against standalone, not mainnet. The data isn't going to really test anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing with a Standalone mode provides a useful baseline, because the performance will only get worse with Mainnet/Testnet.
I'm measuring the memory usage of the client when multiple successive requests are input to the client. (Since no new transactions are provided, there is no need to advance the ledger)
I can set use_testnet=True
, I'm not opposed to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current integration test framework does not easily let us test against the mainnet (unless we write additional utility methods). I'm assuming you're referring to the use_testnet
parameter in test_async_and_sync
decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The integration test framework can be changed, it's not set in stone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you suggesting that I test the performance against the mainnet? Why would that be different than testing with the testnet or the standalone node?
The performance of the xrpl-py client is in question, does it matter if I use a standalone/testnet/mainnet server for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't enough traffic on testnet/standalone to test the performance of xrpl-py, it's not going to run into any issues. There may not even be any data in this pair (or any pair) for the few seconds that you're testing on. You could theoretically generate that traffic yourself, but that'd require a much more complex test and I'm not sure it's worth that level of complexity.
High Level Overview of Change
Do not enqueue messages that correspond to an existing request
Context of Change
Bug Fix: At the moment, we are enqueuing all the incoming messages.
Type of Change
Did you update CHANGELOG.md?
This change impacts the memory usage of the system, hence is not directly perceivable by end users.